# Creating a Workflow

The simplest way to define a workflow is by using the `worker.RegisterWorkflow` method. This method is passed the workflow definition, which includes triggers for the workflow and the steps that the workflow should execute. For example, to trigger a workflow on the `user:created` event, you can do the following:

```go
w.RegisterWorkflow(
    &worker.WorkflowJob{
        Name:        "simple-workflow",
        Description: "Simple one-step workflow.",
        On:          worker.Event("user:created"),
        Steps: []*worker.WorkflowStep{
            worker.Fn(func(ctx worker.HatchetContext) error {
                return nil
            }),
        },
    },
)
```

**Supported Workflow Triggers**

- `worker.Event` - triggers a workflow when an event is received
- `worker.Events` - triggers a workflow when any of the given events are received
- `worker.Cron` - triggers a workflow on a cron schedule
- `worker.Crons` - triggers a workflow from multiple cron schedules
- `worker.At` - triggers a workflow at a specific time. This is useful for one-off workflows. You can also schedule workflows using the `Admin` API - see [here](https://docs.hatchet.run/go-sdk/scheduling-workflows) for more information.

**Supported Workflow Definitions**

- `*worker.WorkflowJob` - a workflow that executes a series of steps
- `worker.Fn` - a single-step workflow ([see below](#single-step-workflows))

## Single-Step Workflows

If your workflow is a single method that is only triggered via API, you can use `worker.Fn` to define your workflow:

```go
w.RegisterWorkflow(
    worker.Fn(func(ctx worker.HatchetContext) error {
        return nil
    }),
)
```

Anonymous functions will be given an auto-generated name based on the package and parent function name. To avoid ugly auto-generated names, you can use `SetName` on the `worker.Fn` struct:

```go
w.RegisterWorkflow(
    worker.Fn(func(ctx worker.HatchetContext) error {
        return nil
    }).SetName("post-user-create"), // this workflow will be named "post-user-create"
)
```

## Multi-Step Workflows

Hatchet supports generating multi-step workflows by specifying parent steps as dependencies. Any parents declared as dependencies will be executed before the current step, and their output data will be available to the current step. For example, the following workflow declares two steps, `step-one` and `step-two`. `step-two` depends on `step-one`, so `step-one` will be executed first, and its output will be passed to `step-two`:

```go
type stepOneOutput struct {
    Message string `json:"message"`
}

err = w.RegisterWorkflow(

	&worker.WorkflowJob{
		Name:        "two-step-workflow",
		Description: "This is an example two-step workflow.",
        On:          worker.Events("user:create"),
		Steps: []*worker.WorkflowStep{
			worker.Fn(func(ctx worker.HatchetContext) (result *stepOneOutput, err error) {
				input := &userCreateEvent{}
                ctx.WorkflowInput(input) // typically you would handle this error

				return &stepOneOutput{
					Message: "Username is: " + input.Username,
				}, nil
			}).SetName("step-one"),
			worker.Fn(func(ctx worker.HatchetContext) (result *stepOneOutput, err error) {
				input := &stepOneOutput{}
				ctx.StepOutput("step-one", input) // typically you would handle this error

				return &stepOneOutput{
					Message: "Above message is: " + input.Message,
				}, nil
			}).SetName("step-two").AddParents("step-one"), // note the usage of `AddParents`
		},
	},
)
```

## Getting Access to the Input Data

You can get access to the workflow's input data, such as the event data or other specified input data, by using the `WorkflowInput` method on the `HatchetContext`. For example, given the following event:

```go
type MyEvent struct {
    Name string `json:"name"`
}
```

You can get access to the event data by doing the following:

```go
func FirstStep(ctx worker.HatchetContext) error {
    event := &MyEvent{}
    err := ctx.WorkflowInput(event)

    if err != nil {
        return err
    }

    fmt.Println("got event: ", event.Name)
    return nil
}
```

## Step Function Signatures

Step functions must always accept a `worker.HatchetContext` as the first argument (or alternatively, `context.Context`), and must return an `error` as the last return value. They can optionally return a value, which must be a pointer to a struct. At the moment, the following are valid step functions:

```go
func (ctx worker.HatchetContext) error
func (ctx worker.HatchetContext) (*myOutput, error)
```

> Why pointers to structs? We use JSON marshalling/unmarshalling under the hood, and pointers to structs are the most predictable way to marshal and unmarshal values. You can use `json` tags and `MarshalJSON` + `UnmarshalJSON` methods to customize the marshalling/unmarshalling behavior.

## Services

Services are a way to logically group workflows into different categories. For example, you may have a `user` service that contains all workflows related to users. You can define a service by using the `worker.NewService` method. For example, to define a `user` service, you can do the following:

```go
userService := w.NewService("user")

userService.RegisterWorkflow(
    &worker.WorkflowJob{
        Name:        "post-user-sign-up",
        On:          worker.Event("user:created"),
        Description: "Workflow that executes after a user signs up.",
        Timeout:     "60s",
        Steps: []*worker.WorkflowStep{
            {
                Function: func(ctx context.Context) error {
                    fmt.Println("running post-user sign up")
                    return nil
                },
            },
        },
    },
)
```

While this is mostly a convenience method at the moment, we plan to add more features to services in the future, like service-level metrics and service-level retries.

## Concurrency Limits and Fairness

> **Note:** this feature is currently in beta, and currently only supports a concurrency strategy which terminates the oldest running workflow run to make room for the new one. This will be expanded in the future to support other strategies.\*\*

By default, there are no concurrency limits for Hatchet workflows. Workflow runs are immediately executed as soon as they are triggered (by an event, cron, or schedule). However, you can enforce a concurrency limit by setting the `Concurrency` field on the `WorkflowJob` struct. You can use `worker.Concurrency` and pass in a function with a signature `func (ctx worker.HatchetContext) (string, error)`. This function returns a **concurrency group key**, which is a string that is used to group concurrent executions. For example, the following workflow will only allow 5 concurrent executions for any workflow execution of `concurrency-limit`, since the key is statically set to `my-key`:

```go
func getConcurrencyKey(ctx worker.HatchetContext) (string, error) {
	return "my-key", nil
}

err = w.RegisterWorkflow(
    &worker.WorkflowJob{
        Name:        "concurrency-limit",
        On:          worker.Events("concurrency-test-event"),
        Description: "This limits concurrency to 1 run at a time.",
        Concurrency: worker.Concurrency(getConcurrencyKey).MaxRuns(1),
        Steps: []*worker.WorkflowStep{
            // your steps here...
        },
    },
)
```

### Use-Case: Enforcing Per-User Concurrency Limits

You can use the custom concurrency function to enforce per-user concurrency limits. For example, the following workflow will only allow 1 concurrent execution per user:

```go
type MyUser struct {
    UserId string `json:"user_id"`
}

func getConcurrencyKey(ctx worker.HatchetContext) (string, error) {
	event := &MyEvent{}
    err := ctx.WorkflowInput(event)

    if err != nil {
        return "", err
    }

    return event.UserId, nil
}

err = w.RegisterWorkflow(
    &worker.WorkflowJob{
        Name:        "concurrency-limit-per-user",
        On:          worker.Events("concurrency-test-event"),
        Description: "This limits concurrency to 1 run at a time per user.",
        Concurrency: worker.Concurrency(getConcurrencyKey).MaxRuns(1),
        Steps: []*worker.WorkflowStep{
            // your steps here...
        },
    },
)
```

## Cron Schedules

You can declare a cron schedule by passing `worker.Cron` to the `worker.RegisterWorkflow` method. For example, to trigger a workflow every 5 minutes, you can do the following:

```go
w.RegisterWorkflow(
    &worker.WorkflowJob{
        Name:        "my-cron-job",
        On:          worker.Cron("*/5 * * * *"),
        Description: "Cron workflow example.",
        Timeout:     "60s",
        Steps: []*worker.WorkflowStep{
            {
                Function: func(ctx context.Context) error {
                    fmt.Println("triggered at:", time.Now())
                    return nil
                },
            },
        },
    },
)
```

## Middleware

You can define middleware that will be executed before and after each step function. Middleware functions have the following signature:

```go
func(ctx context.Context, next func(context.Context) error) error
```

You can register this middleware globally (at the worker level) or at the service level, using `worker.Use` and `service.Use`, respectively. For example, to define a middleware that logs the start and end of each step function, you can do the following:

```go
w.Use(func(ctx context.Context, next func(context.Context) error) error {
    // time the function duration
    start := time.Now()
    err := next(ctx)
    duration := time.Since(start)
    fmt.Printf("step function took %s\n", duration)
    return err
})
```

You can also use the middleware to add values to the context. For example:

```go
w.Use(func(ctx context.Context, next func(context.Context) error) error {
    err := next(context.WithValue(ctx, "testkey", "testvalue"))

    if err != nil {
        return fmt.Errorf("error in middleware: %w", err)
    }

    return nil
})
```

## Re-using Actions

If you have a common set of steps that you want to re-use across multiple workflows, you can define use `RegisterAction` on either a service or a worker. For example, to define a `send-email` action:

```go
testSvc := w.NewService("test")

err = testSvc.RegisterAction(StepOne, worker.WithActionName("step-one"))

if err != nil {
	panic(err)
}

err = testSvc.RegisterWorkflow(
	testSvc.Call("step-one"),
)
```

Note the usage of `testSvc.Call("step-one")` to invoke a single-step action.
